1 package org.apache.lucene.index;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 import java.io.IOException;
21 import java.text.NumberFormat;
22 import java.util.Collections;
23 import java.util.HashMap;
24 import java.util.HashSet;
25 import java.util.Locale;
26 import java.util.Set;
27 import java.util.concurrent.atomic.AtomicLong;
28
29 import org.apache.lucene.analysis.Analyzer;
30 import org.apache.lucene.codecs.Codec;
31 import org.apache.lucene.index.DocumentsWriterDeleteQueue.DeleteSlice;
32 import org.apache.lucene.search.similarities.Similarity;
33 import org.apache.lucene.store.Directory;
34 import org.apache.lucene.store.FlushInfo;
35 import org.apache.lucene.store.IOContext;
36 import org.apache.lucene.store.TrackingDirectoryWrapper;
37 import org.apache.lucene.util.ByteBlockPool.Allocator;
38 import org.apache.lucene.util.ByteBlockPool.DirectTrackingAllocator;
39 import org.apache.lucene.util.Counter;
40 import org.apache.lucene.util.InfoStream;
41 import org.apache.lucene.util.IntBlockPool;
42 import org.apache.lucene.util.MutableBits;
43 import org.apache.lucene.util.RamUsageEstimator;
44 import org.apache.lucene.util.StringHelper;
45 import org.apache.lucene.util.Version;
46
47 import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_MASK;
48 import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_SIZE;
49
50 class DocumentsWriterPerThread {
51
52
53
54
55
56
57 abstract static class IndexingChain {
58 abstract DocConsumer getChain(DocumentsWriterPerThread documentsWriterPerThread) throws IOException;
59 }
60
61
62 static final IndexingChain defaultIndexingChain = new IndexingChain() {
63
64 @Override
65 DocConsumer getChain(DocumentsWriterPerThread documentsWriterPerThread) throws IOException {
66 return new DefaultIndexingChain(documentsWriterPerThread);
67 }
68 };
69
70 static class DocState {
71 final DocumentsWriterPerThread docWriter;
72 Analyzer analyzer;
73 InfoStream infoStream;
74 Similarity similarity;
75 int docID;
76 Iterable<? extends IndexableField> doc;
77
78 DocState(DocumentsWriterPerThread docWriter, InfoStream infoStream) {
79 this.docWriter = docWriter;
80 this.infoStream = infoStream;
81 }
82
83 public void testPoint(String name) {
84 docWriter.testPoint(name);
85 }
86
87 public void clear() {
88
89
90 doc = null;
91 analyzer = null;
92 }
93 }
94
95 static class FlushedSegment {
96 final SegmentCommitInfo segmentInfo;
97 final FieldInfos fieldInfos;
98 final FrozenBufferedUpdates segmentUpdates;
99 final MutableBits liveDocs;
100 final int delCount;
101
102 private FlushedSegment(SegmentCommitInfo segmentInfo, FieldInfos fieldInfos,
103 BufferedUpdates segmentUpdates, MutableBits liveDocs, int delCount) {
104 this.segmentInfo = segmentInfo;
105 this.fieldInfos = fieldInfos;
106 this.segmentUpdates = segmentUpdates != null && segmentUpdates.any() ? new FrozenBufferedUpdates(segmentUpdates, true) : null;
107 this.liveDocs = liveDocs;
108 this.delCount = delCount;
109 }
110 }
111
112
113
114
115
116 void abort() {
117
118 aborted = true;
119 try {
120 if (infoStream.isEnabled("DWPT")) {
121 infoStream.message("DWPT", "now abort");
122 }
123 try {
124 consumer.abort();
125 } catch (Throwable t) {
126 }
127
128 pendingUpdates.clear();
129 } finally {
130 if (infoStream.isEnabled("DWPT")) {
131 infoStream.message("DWPT", "done abort");
132 }
133 }
134 }
135 private final static boolean INFO_VERBOSE = false;
136 final Codec codec;
137 final TrackingDirectoryWrapper directory;
138 final Directory directoryOrig;
139 final DocState docState;
140 final DocConsumer consumer;
141 final Counter bytesUsed;
142
143 SegmentWriteState flushState;
144
145 final BufferedUpdates pendingUpdates;
146 private final SegmentInfo segmentInfo;
147 boolean aborted = false;
148
149 private final FieldInfos.Builder fieldInfos;
150 private final InfoStream infoStream;
151 private int numDocsInRAM;
152 final DocumentsWriterDeleteQueue deleteQueue;
153 private final DeleteSlice deleteSlice;
154 private final NumberFormat nf = NumberFormat.getInstance(Locale.ROOT);
155 final Allocator byteBlockAllocator;
156 final IntBlockPool.Allocator intBlockAllocator;
157 private final AtomicLong pendingNumDocs;
158 private final LiveIndexWriterConfig indexWriterConfig;
159 private final boolean enableTestPoints;
160 private final IndexWriter indexWriter;
161
162 public DocumentsWriterPerThread(IndexWriter writer, String segmentName, Directory directoryOrig, Directory directory, LiveIndexWriterConfig indexWriterConfig, InfoStream infoStream, DocumentsWriterDeleteQueue deleteQueue,
163 FieldInfos.Builder fieldInfos, AtomicLong pendingNumDocs, boolean enableTestPoints) throws IOException {
164 this.indexWriter = writer;
165 this.directoryOrig = directoryOrig;
166 this.directory = new TrackingDirectoryWrapper(directory);
167 this.fieldInfos = fieldInfos;
168 this.indexWriterConfig = indexWriterConfig;
169 this.infoStream = infoStream;
170 this.codec = indexWriterConfig.getCodec();
171 this.docState = new DocState(this, infoStream);
172 this.docState.similarity = indexWriterConfig.getSimilarity();
173 this.pendingNumDocs = pendingNumDocs;
174 bytesUsed = Counter.newCounter();
175 byteBlockAllocator = new DirectTrackingAllocator(bytesUsed);
176 pendingUpdates = new BufferedUpdates();
177 intBlockAllocator = new IntBlockAllocator(bytesUsed);
178 this.deleteQueue = deleteQueue;
179 assert numDocsInRAM == 0 : "num docs " + numDocsInRAM;
180 pendingUpdates.clear();
181 deleteSlice = deleteQueue.newSlice();
182
183 segmentInfo = new SegmentInfo(directoryOrig, Version.LATEST, segmentName, -1, false, codec, Collections.<String,String>emptyMap(), StringHelper.randomId(), new HashMap<String,String>());
184 assert numDocsInRAM == 0;
185 if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) {
186 infoStream.message("DWPT", Thread.currentThread().getName() + " init seg=" + segmentName + " delQueue=" + deleteQueue);
187 }
188
189
190 consumer = indexWriterConfig.getIndexingChain().getChain(this);
191 this.enableTestPoints = enableTestPoints;
192 }
193
194 public FieldInfos.Builder getFieldInfosBuilder() {
195 return fieldInfos;
196 }
197
198 final void testPoint(String message) {
199 if (enableTestPoints) {
200 assert infoStream.isEnabled("TP");
201 infoStream.message("TP", message);
202 }
203 }
204
205
206
207 private void reserveOneDoc() {
208 if (pendingNumDocs.incrementAndGet() > IndexWriter.getActualMaxDocs()) {
209
210 pendingNumDocs.decrementAndGet();
211 throw new IllegalArgumentException("number of documents in the index cannot exceed " + IndexWriter.getActualMaxDocs());
212 }
213 }
214
215 public void updateDocument(Iterable<? extends IndexableField> doc, Analyzer analyzer, Term delTerm) throws IOException, AbortingException {
216 testPoint("DocumentsWriterPerThread addDocument start");
217 assert deleteQueue != null;
218 reserveOneDoc();
219 docState.doc = doc;
220 docState.analyzer = analyzer;
221 docState.docID = numDocsInRAM;
222 if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) {
223 infoStream.message("DWPT", Thread.currentThread().getName() + " update delTerm=" + delTerm + " docID=" + docState.docID + " seg=" + segmentInfo.name);
224 }
225
226
227
228
229
230
231 boolean success = false;
232 try {
233 try {
234 consumer.processDocument();
235 } finally {
236 docState.clear();
237 }
238 success = true;
239 } finally {
240 if (!success) {
241
242 deleteDocID(docState.docID);
243 numDocsInRAM++;
244 }
245 }
246 finishDocument(delTerm);
247 }
248
249 public int updateDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs, Analyzer analyzer, Term delTerm) throws IOException, AbortingException {
250 testPoint("DocumentsWriterPerThread addDocuments start");
251 assert deleteQueue != null;
252 docState.analyzer = analyzer;
253 if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) {
254 infoStream.message("DWPT", Thread.currentThread().getName() + " update delTerm=" + delTerm + " docID=" + docState.docID + " seg=" + segmentInfo.name);
255 }
256 int docCount = 0;
257 boolean allDocsIndexed = false;
258 try {
259 for(Iterable<? extends IndexableField> doc : docs) {
260
261
262
263
264
265
266 reserveOneDoc();
267 docState.doc = doc;
268 docState.docID = numDocsInRAM;
269 docCount++;
270
271 boolean success = false;
272 try {
273 consumer.processDocument();
274 success = true;
275 } finally {
276 if (!success) {
277
278
279 numDocsInRAM++;
280 }
281 }
282 finishDocument(null);
283 }
284 allDocsIndexed = true;
285
286
287
288
289 if (delTerm != null) {
290 deleteQueue.add(delTerm, deleteSlice);
291 assert deleteSlice.isTailItem(delTerm) : "expected the delete term as the tail item";
292 deleteSlice.apply(pendingUpdates, numDocsInRAM-docCount);
293 }
294
295 } finally {
296 if (!allDocsIndexed && !aborted) {
297
298
299 int docID = numDocsInRAM-1;
300 final int endDocID = docID - docCount;
301 while (docID > endDocID) {
302 deleteDocID(docID);
303 docID--;
304 }
305 }
306 docState.clear();
307 }
308
309 return docCount;
310 }
311
312 private void finishDocument(Term delTerm) {
313
314
315
316
317
318
319
320
321 boolean applySlice = numDocsInRAM != 0;
322 if (delTerm != null) {
323 deleteQueue.add(delTerm, deleteSlice);
324 assert deleteSlice.isTailItem(delTerm) : "expected the delete term as the tail item";
325 } else {
326 applySlice &= deleteQueue.updateSlice(deleteSlice);
327 }
328
329 if (applySlice) {
330 deleteSlice.apply(pendingUpdates, numDocsInRAM);
331 } else {
332 deleteSlice.reset();
333 }
334 ++numDocsInRAM;
335 }
336
337
338
339 void deleteDocID(int docIDUpto) {
340 pendingUpdates.addDocID(docIDUpto);
341
342
343
344
345
346
347
348
349
350 }
351
352
353
354
355 public int numDeleteTerms() {
356
357 return pendingUpdates.numTermDeletes.get();
358 }
359
360
361
362
363 public int getNumDocsInRAM() {
364
365 return numDocsInRAM;
366 }
367
368
369
370
371
372
373
374 FrozenBufferedUpdates prepareFlush() {
375 assert numDocsInRAM > 0;
376 final FrozenBufferedUpdates globalUpdates = deleteQueue.freezeGlobalBuffer(deleteSlice);
377
378
379 if (deleteSlice != null) {
380
381 deleteSlice.apply(pendingUpdates, numDocsInRAM);
382 assert deleteSlice.isEmpty();
383 deleteSlice.reset();
384 }
385 return globalUpdates;
386 }
387
388
389 FlushedSegment flush() throws IOException, AbortingException {
390 assert numDocsInRAM > 0;
391 assert deleteSlice.isEmpty() : "all deletes must be applied in prepareFlush";
392 segmentInfo.setMaxDoc(numDocsInRAM);
393 final SegmentWriteState flushState = new SegmentWriteState(infoStream, directory, segmentInfo, fieldInfos.finish(),
394 pendingUpdates, new IOContext(new FlushInfo(numDocsInRAM, bytesUsed())));
395 final double startMBUsed = bytesUsed() / 1024. / 1024.;
396
397
398
399
400 if (pendingUpdates.docIDs.size() > 0) {
401 flushState.liveDocs = codec.liveDocsFormat().newLiveDocs(numDocsInRAM);
402 for(int delDocID : pendingUpdates.docIDs) {
403 flushState.liveDocs.clear(delDocID);
404 }
405 flushState.delCountOnFlush = pendingUpdates.docIDs.size();
406 pendingUpdates.bytesUsed.addAndGet(-pendingUpdates.docIDs.size() * BufferedUpdates.BYTES_PER_DEL_DOCID);
407 pendingUpdates.docIDs.clear();
408 }
409
410 if (aborted) {
411 if (infoStream.isEnabled("DWPT")) {
412 infoStream.message("DWPT", "flush: skip because aborting is set");
413 }
414 return null;
415 }
416
417 if (infoStream.isEnabled("DWPT")) {
418 infoStream.message("DWPT", "flush postings as segment " + flushState.segmentInfo.name + " numDocs=" + numDocsInRAM);
419 }
420
421 try {
422 consumer.flush(flushState);
423 pendingUpdates.terms.clear();
424 segmentInfo.setFiles(new HashSet<>(directory.getCreatedFiles()));
425
426 final SegmentCommitInfo segmentInfoPerCommit = new SegmentCommitInfo(segmentInfo, 0, -1L, -1L, -1L);
427 if (infoStream.isEnabled("DWPT")) {
428 infoStream.message("DWPT", "new segment has " + (flushState.liveDocs == null ? 0 : flushState.delCountOnFlush) + " deleted docs");
429 infoStream.message("DWPT", "new segment has " +
430 (flushState.fieldInfos.hasVectors() ? "vectors" : "no vectors") + "; " +
431 (flushState.fieldInfos.hasNorms() ? "norms" : "no norms") + "; " +
432 (flushState.fieldInfos.hasDocValues() ? "docValues" : "no docValues") + "; " +
433 (flushState.fieldInfos.hasProx() ? "prox" : "no prox") + "; " +
434 (flushState.fieldInfos.hasFreq() ? "freqs" : "no freqs"));
435 infoStream.message("DWPT", "flushedFiles=" + segmentInfoPerCommit.files());
436 infoStream.message("DWPT", "flushed codec=" + codec);
437 }
438
439 final BufferedUpdates segmentDeletes;
440 if (pendingUpdates.queries.isEmpty() && pendingUpdates.numericUpdates.isEmpty() && pendingUpdates.binaryUpdates.isEmpty()) {
441 pendingUpdates.clear();
442 segmentDeletes = null;
443 } else {
444 segmentDeletes = pendingUpdates;
445 }
446
447 if (infoStream.isEnabled("DWPT")) {
448 final double newSegmentSize = segmentInfoPerCommit.sizeInBytes()/1024./1024.;
449 infoStream.message("DWPT", "flushed: segment=" + segmentInfo.name +
450 " ramUsed=" + nf.format(startMBUsed) + " MB" +
451 " newFlushedSize=" + nf.format(newSegmentSize) + " MB" +
452 " docs/MB=" + nf.format(flushState.segmentInfo.maxDoc() / newSegmentSize));
453 }
454
455 assert segmentInfo != null;
456
457 FlushedSegment fs = new FlushedSegment(segmentInfoPerCommit, flushState.fieldInfos,
458 segmentDeletes, flushState.liveDocs, flushState.delCountOnFlush);
459 sealFlushedSegment(fs);
460
461 return fs;
462 } catch (Throwable th) {
463 abort();
464 throw AbortingException.wrap(th);
465 }
466 }
467
468 private final Set<String> filesToDelete = new HashSet<>();
469
470 public Set<String> pendingFilesToDelete() {
471 return filesToDelete;
472 }
473
474
475
476
477 void sealFlushedSegment(FlushedSegment flushedSegment) throws IOException {
478 assert flushedSegment != null;
479
480 SegmentCommitInfo newSegment = flushedSegment.segmentInfo;
481
482 IndexWriter.setDiagnostics(newSegment.info, IndexWriter.SOURCE_FLUSH);
483
484 IOContext context = new IOContext(new FlushInfo(newSegment.info.maxDoc(), newSegment.sizeInBytes()));
485
486 boolean success = false;
487 try {
488
489 if (indexWriterConfig.getUseCompoundFile()) {
490 Set<String> originalFiles = newSegment.info.files();
491
492 indexWriter.createCompoundFile(infoStream, new TrackingDirectoryWrapper(directory), newSegment.info, context);
493 filesToDelete.addAll(originalFiles);
494 newSegment.info.setUseCompoundFile(true);
495 }
496
497
498
499
500
501 codec.segmentInfoFormat().write(directory, newSegment.info, context);
502
503
504
505
506
507
508
509 if (flushedSegment.liveDocs != null) {
510 final int delCount = flushedSegment.delCount;
511 assert delCount > 0;
512 if (infoStream.isEnabled("DWPT")) {
513 infoStream.message("DWPT", "flush: write " + delCount + " deletes gen=" + flushedSegment.segmentInfo.getDelGen());
514 }
515
516
517
518
519
520
521
522
523
524
525 SegmentCommitInfo info = flushedSegment.segmentInfo;
526 Codec codec = info.info.getCodec();
527 codec.liveDocsFormat().writeLiveDocs(flushedSegment.liveDocs, directory, info, delCount, context);
528 newSegment.setDelCount(delCount);
529 newSegment.advanceDelGen();
530 }
531
532 success = true;
533 } finally {
534 if (!success) {
535 if (infoStream.isEnabled("DWPT")) {
536 infoStream.message("DWPT",
537 "hit exception creating compound file for newly flushed segment " + newSegment.info.name);
538 }
539 }
540 }
541 }
542
543
544 SegmentInfo getSegmentInfo() {
545 return segmentInfo;
546 }
547
548 long bytesUsed() {
549 return bytesUsed.get() + pendingUpdates.bytesUsed.get();
550 }
551
552
553
554 final static int BYTE_BLOCK_NOT_MASK = ~BYTE_BLOCK_MASK;
555
556
557
558 final static int MAX_TERM_LENGTH_UTF8 = BYTE_BLOCK_SIZE-2;
559
560
561 private static class IntBlockAllocator extends IntBlockPool.Allocator {
562 private final Counter bytesUsed;
563
564 public IntBlockAllocator(Counter bytesUsed) {
565 super(IntBlockPool.INT_BLOCK_SIZE);
566 this.bytesUsed = bytesUsed;
567 }
568
569
570 @Override
571 public int[] getIntBlock() {
572 int[] b = new int[IntBlockPool.INT_BLOCK_SIZE];
573 bytesUsed.addAndGet(IntBlockPool.INT_BLOCK_SIZE
574 * RamUsageEstimator.NUM_BYTES_INT);
575 return b;
576 }
577
578 @Override
579 public void recycleIntBlocks(int[][] blocks, int offset, int length) {
580 bytesUsed.addAndGet(-(length * (IntBlockPool.INT_BLOCK_SIZE * RamUsageEstimator.NUM_BYTES_INT)));
581 }
582
583 }
584
585 @Override
586 public String toString() {
587 return "DocumentsWriterPerThread [pendingDeletes=" + pendingUpdates
588 + ", segment=" + (segmentInfo != null ? segmentInfo.name : "null") + ", aborted=" + aborted + ", numDocsInRAM="
589 + numDocsInRAM + ", deleteQueue=" + deleteQueue + "]";
590 }
591
592 }